package com.ruyuan.eshop.push.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.common.message.PlatformMessagePushMessage;
import com.ruyuan.eshop.push.domain.dto.MessageSendDTO;
import com.ruyuan.eshop.push.service.MessageSendService;
import com.ruyuan.eshop.push.service.MessageSendServiceFactory;
import com.ruyuan.eshop.push.service.impl.FactoryProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;


import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.ruyuan.eshop.common.constants.RedisKey.PROMOTION_CONDITION_COUPON_KEY;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormConditionCouponListener implements MessageListenerConcurrently {

    /**
     * 消息推送工厂提供者
     */
    @Autowired
    private FactoryProducer factoryProducer;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {

        try {
            // 方式一：使用默认的commonPool来处理任务
            // supplyAsync(Supplier<U> supplier) API
            // 默认使用的是 ForkJoinPool.commonPool() 这个线程池
            // 该线程池在jvm内是唯一的，默认的线程数量是cpu的核数减1
            // 如果觉得不线程数不够用可以通过jvm系统参数 java.util.concurrent.ForkJoinPool.common.parallelism 的值调整commonPool的并行度，或者采用方式二
            /*List<CompletableFuture<AltResult>> futureList = msgList.stream()
                    .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                    .collect(Collectors.toList());*/

            // 方式二：使用自定的业务线程池来处理任务
            List<CompletableFuture<AltResult>> futureList = msgList.stream()
                    .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                    .collect(Collectors.toList());

            List<Throwable> resultList = futureList.stream()
                    .map(CompletableFuture::join)
                    .filter(e -> e.ex != null)
                    .map(e -> e.ex)
                    .collect(Collectors.toList());
        } catch (Exception e){
            log.error("consume error,消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private AltResult handleMessageExt(MessageExt messageExt) {
        try {
            log.debug("执行平台发送通知消息逻辑，消息内容：{}", messageExt.getBody());
            String msg = new String(messageExt.getBody());
            PlatformMessagePushMessage message = JSON.parseObject(msg , PlatformMessagePushMessage.class);

            // 幂等控制
            if (StringUtils.isNotBlank(redisTemplate.opsForValue()
                    .get(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId()))) {
                return new AltResult(null);
            }

            // 获取消息服务工厂
            MessageSendServiceFactory messageSendServiceFactory = factoryProducer
                    .getMessageSendServiceFactory(message.getInformType());

            // 消息发送服务组件
            MessageSendService messageSendService = messageSendServiceFactory
                    .createMessageSendService();

            // 构造消息
            PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
                    .informType(message.getInformType())
                    .mainMessage(message.getMainMessage())
                    .userAccountId(message.getUserAccountId())
                    .message(message.getMessage())
                    .build();

            MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);

            messageSendService.send(messageSendDTO);

            // 发送成功之后把已经发送成功记录到redis
            redisTemplate.opsForValue().set(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId(), UUID.randomUUID().toString());
            log.info("消息推送完成，messageSendDTO:{}", messageSendDTO);

            return new AltResult(null);
        } catch (Exception e) {
            return new AltResult(e);
        }
    }

    /**
     * completableFuture的返回结果，适用于无返回值的情况
     * ex字段为null表示任务执行成功
     * ex字段不为null表示任务执行失败，并把异常设置为ex字段
     */
    private static class AltResult {

        final Throwable ex;

        public AltResult(Throwable ex) {
            this.ex = ex;
        }
    }

}
